perf: implement batch processing in iterateEvalTree#406
Conversation
|
@seqbenchbot up main search-keyword-exact-match-warm |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## 329-batching-1 #406 +/- ##
==================================================
- Coverage 71.54% 70.58% -0.97%
==================================================
Files 220 221 +1
Lines 16568 20423 +3855
==================================================
+ Hits 11854 14415 +2561
- Misses 3840 5128 +1288
- Partials 874 880 +6 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@seqbenchbot down e8eefca9 |
|
Nice, @cheb0 The benchmark with identificator Show summary
Have a great time! |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for _, lid := range lids { | ||
| rawLid := lid.Unpack() | ||
| blockIdx := p.table.GetIDBlockIndexByLID(rawLid) | ||
| if p.midCache.blockIndex != int(blockIdx) { |
There was a problem hiding this comment.
nit: fillMIDs has this check inside. did you add it to avoid function call?
| // Get MIDs | ||
| if needMids > 0 { | ||
| timerMID.Start() | ||
| mids = idsIndex.GetMIDs(lidsSlice[0:needMids], mids[:0]) |
There was a problem hiding this comment.
nit: technically we can omit the lower bound if it equals 0
lidsSlice[:needMids]| return seq.MID(p.midCache.GetValByLID(uint32(lid))), nil | ||
| } | ||
|
|
||
| func (p *Provider) MIDs(lids []node.LID, out []seq.MID) ([]seq.MID, error) { |
There was a problem hiding this comment.
Why Provider has method for retrieving a batch of MID but there is no similar method for RID?
| defer searchBuffersPool.Put(buffers) | ||
| mids := buffers.mids | ||
| rids := buffers.rids | ||
| lidsBuffer := buffers.lids |
There was a problem hiding this comment.
Shouldn't you reset buffers since slices are reused?
There was a problem hiding this comment.
looks like no, since I do not reassign it
| lidsBuf := lidsBuf{ | ||
| lids: make([]node.LID, 0, consts.LIDBlockCap), | ||
| } | ||
| return searchBuffers{ |
There was a problem hiding this comment.
It's better to return a pointer here, otherwise there will be unnecessary allocations since any is returned.
| filterMIDs := sw.Timer("filter_mids") | ||
| updateHist := sw.Timer("update_hist") |
There was a problem hiding this comment.
| filterMIDs := sw.Timer("filter_mids") | |
| updateHist := sw.Timer("update_hist") | |
| timerFilterMIDs := sw.Timer("filter_mids") | |
| timerUpdateHist := sw.Timer("update_hist") |
| @@ -47,12 +49,23 @@ type LIDsIter interface { | |||
There was a problem hiding this comment.
| LIDs(out []node.LID) []node.LID |
There was a problem hiding this comment.
I'll leave it here since it is out of scope of this diff.
Take a look at https://github.com/ozontech/seq-db/blob/329-batching-iterate-eval-tree/frac/sealed/lids/iterator_desc.go#L121-L131 -- I guess you've introduced code duplication while performing rebase.
| return total, ids, hist, aggs, nil | ||
| } | ||
|
|
||
| func filterOutOfRangeMIDs(params SearchParams, mids []seq.MID, lidsSlice []node.LID) ([]seq.MID, []node.LID) { |
There was a problem hiding this comment.
I am not sure what purpose this function serves.
Per my understanding, we cannot iterate over seq.LID which correspond to seq.ID that lie outside of user-requested range [from; to] -- this is guaranteed because we calculate minLID and maxLID in getLIDsBorders and use those in all iterators to set boundaries.
Am I missing something?
There was a problem hiding this comment.
This check is right from the original implementation, I have not added it: https://github.com/ozontech/seq-db/blob/main/frac/processor/search.go#L229.
The only thing it does is converting potential panic to error if minLID and maxLID deriving does not work somehow. And it's enabled only for histrograms. But it's more appropriate to panic in this case. Basically, it looks useless to me.
There was a problem hiding this comment.
It seems I added the original check when I was refactoring the histogram from a map to a slice, just to make sure the new approach wouldn't go out of bounds (being cautious).
I suggest either removing this check now, or moving it inside hist.Update(mids). Not exactly that check, but rather a bounds check — ensuring we don't go out of slice bounds.
| buffers := searchBuffersPool.Get().(searchBuffers) | ||
| defer searchBuffersPool.Put(buffers) | ||
| mids := buffers.mids | ||
| rids := buffers.rids |
There was a problem hiding this comment.
Starting a petition to protect Vim users and their descendants — we require spaces. This is how we navigate code. Thank you for your cooperation.
Maybe something like?
var (
total int
lastID seq.ID
ids seq.IDSources
)
buffers := searchBuffersPool.Get().(searchBuffers)
defer searchBuffersPool.Put(buffers)| } | ||
| // limit how much we drain from eval tree for one-by-one flow. ignored for batched flow | ||
| need = min(need, maxLidsToDrain) | ||
| needLids = min(needLids, maxLidsToDrain) |
There was a problem hiding this comment.
Maybe we can move this whole thing with calculating limits/offsets/etc to the batch? I mean something like:
if ok {
evalTreeIter = func(need int, _ lidsBuf) LIDsIter {
// batched flow: juts get a batch and return
return batchNode.NextBatch().Trim(need)
// Or return batchNode.NextBatch(need)
}
} else {
...
}
func (b LIDBatch) Trim(k int) LIDBatch {
b.lids = b.lids[:min(k, len(b.lids))]
return b
}There was a problem hiding this comment.
I would prefer batchNode.NextBatch(need) but it needs more work to do.
|
|
||
| timerEval.Start() | ||
| lidBatch := evalTree(need, buf) | ||
| lidBatch := evalTree(needLids, lidsBuffer) |
There was a problem hiding this comment.
The current implementation of this function is overly confusing.
Issues:
- The needLids parameter is unused in one branch, while lidsBuffer is unused in another branch.
- The line lidsSlice := lidBatch.LIDs(lidsBuffer.lids) reuses the buffer, but one of the LIDsIter implementations (lidsBuf) ignores this parameter.
- The logic is hard not only to read but even to explain verbally.
Proposed simplification:
- Remove the LIDsIter interface.
- Remove lidsBuf.
- Extract the wrapper into a separate function, e.g., batcher.
- The batcher function should directly return []node.LIDs.
- Implement a reusable buffer inside that function using a closure.
This will make the code clearer and eliminate the confusion caused by unused arguments in different branches.
diff --git a/frac/processor/search.go b/frac/processor/search.go
index c4836c3e..d7fe6e0a 100644
--- a/frac/processor/search.go
+++ b/frac/processor/search.go
@@ -44,26 +44,18 @@ type searchIndex interface {
GetSkipLIDs(minLID, maxLID uint32, reverse bool) (node.Node, bool, error)
}
-type LIDsIter interface {
- LIDs(out []node.LID) []node.LID
- Len() int
-}
-
type searchBuffers struct {
- lids lidsBuf
+ lids []node.LID
mids []seq.MID
rids []seq.RID
}
var searchBuffersPool = sync.Pool{
New: func() any {
- lidsBuf := lidsBuf{
- lids: make([]node.LID, 0, consts.LIDBlockCap),
- }
return &searchBuffers{
// Currently, we drain up to 4k lids from eval tree, but with proper batching enabled
// we can get as much as whole LID block can have (currently, 64k lids)
- lids: lidsBuf,
+ lids: make([]node.LID, 0, consts.LIDBlockCap),
mids: make([]seq.MID, 0, consts.LIDBlockCap),
rids: make([]seq.RID, 0, consts.LIDBlockCap),
}
@@ -142,30 +134,8 @@ func IndexSearch(
m.Stop()
}
- var evalTreeIter func(need int, out lidsBuf) LIDsIter
- batchNode, ok := tryConvertToBatchedTree(evalTree)
-
- if ok {
- evalTreeIter = func(need int, _ lidsBuf) LIDsIter {
- // batched flow: juts get a batch and return
- return batchNode.NextBatch()
- }
- } else {
- evalTreeIter = func(need int, buf lidsBuf) LIDsIter {
- // iterator flow: buffer LIDs one by one and return a batch
- for i := 0; i < need; i++ {
- lid := evalTree.Next()
- if lid.IsNull() {
- break
- }
- buf = buf.append(lid)
- }
- return buf
- }
- }
-
m = sw.Start("iterate_eval_tree")
- total, ids, histMap, aggs, err := iterateEvalTree(ctx, params, index, evalTreeIter, aggSupplier, sw)
+ total, ids, histMap, aggs, err := iterateEvalTree(ctx, params, index, evalTree, aggSupplier, sw)
m.Stop()
if err != nil {
@@ -207,11 +177,36 @@ func IndexSearch(
return qpr, nil
}
+func batcher(evalTree node.Node, buf []node.LID) func(need int) []node.LID {
+ if batchNode, ok := tryConvertToBatchedTree(evalTree); ok {
+ return func(need int) []node.LID {
+ buf = batchNode.NextBatch().LIDs(buf[:0])
+ if len(buf) > need {
+ buf = buf[:need]
+ }
+ return buf
+ }
+ }
+
+ return func(need int) []node.LID {
+ // iterator flow: buffer LIDs one by one and return a batch
+ buf = buf[:0]
+ for range min(maxLidsToDrain, need) {
+ lid := evalTree.Next()
+ if lid.IsNull() {
+ break
+ }
+ buf = append(buf, lid)
+ }
+ return buf
+ }
+}
+
func iterateEvalTree(
ctx context.Context,
params SearchParams,
idsIndex idsIndex,
- evalTree func(need int, buf lidsBuf) LIDsIter,
+ evalTree node.Node,
aggSupplier func() ([]Aggregator, error),
sw *stopwatch.Stopwatch,
) (int, seq.IDSources, HistMap, []Aggregator, error) {
@@ -233,7 +228,8 @@ func iterateEvalTree(
defer searchBuffersPool.Put(buffers)
mids := buffers.mids
rids := buffers.rids
- lidsBuffer := buffers.lids
+
+ batchedEvalTree := batcher(evalTree, buffers.lids)
timerEval := sw.Timer("eval_tree_next")
timerMID := sw.Timer("get_mid")
@@ -256,19 +252,15 @@ func iterateEvalTree(
if needScanAllRange {
needLids = math.MaxUint32
}
- // limit how much we drain from eval tree for one-by-one flow. ignored for batched flow
- needLids = min(needLids, maxLidsToDrain)
timerEval.Start()
- lidBatch := evalTree(needLids, lidsBuffer)
+ lidsSlice := batchedEvalTree(needLids)
timerEval.Stop()
- if lidBatch.Len() == 0 {
+ if len(lidsSlice) == 0 {
break
}
- lidsSlice := lidBatch.LIDs(lidsBuffer.lids)
-
needMids := min(params.Limit-len(ids), len(lidsSlice))
if hasHist {
// need to fetch mids for all lids for hist
@@ -377,26 +369,6 @@ func filterOutOfRangeMIDs(params SearchParams, mids []seq.MID, lidsSlice []node.
return mids, lidsSlice
}
-// lidsBuf maintains node.LID in slice as is (append order).
-// Used to drain batches of LIDs when eval tree doesn't support batching.
-type lidsBuf struct {
- lids []node.LID
-}
-
-func (b lidsBuf) append(x node.LID) lidsBuf {
- return lidsBuf{
- lids: append(b.lids, x),
- }
-}
-
-func (b lidsBuf) Len() int {
- return len(b.lids)
-}
-
-func (b lidsBuf) LIDs(_ []node.LID) []node.LID {
- return b.lids
-}
-
func tryConvertToBatchedTree(evalTree node.Node) (node.BatchedNode, bool) {
switch it := evalTree.(type) {
case *lids.IteratorDesc:| bucketIndex := uint64(mid)/uint64(histInterval) - histBase | ||
| histogram[bucketIndex]++ | ||
| } | ||
| needMids := min(params.Limit-len(ids), len(lidsSlice)) |
There was a problem hiding this comment.
nit: naming:
- needMids -> needMIDs or midsNeeded
- needLids -> needLIDs or lidsNeeded
- needIds -> needIDs or idsNeeded
| return seq.MID(p.midCache.GetValByLID(uint32(lid))), nil | ||
| } | ||
|
|
||
| func (p *Provider) MIDs(lids []node.LID, out []seq.MID) ([]seq.MID, error) { |
There was a problem hiding this comment.
nit: The only place where MIDs is called is sealedIDsIndex.GetMIDs — and it's just a wrapper:
func (ii *sealedIDsIndex) GetMIDs(lidsBatch []node.LID, out []seq.MID) []seq.MID {
mids, err := ii.provider.MIDs(lidsBatch, out)
if err != nil {
logger.Panic("get mids error", zap.String("frac", ii.fracName), zap.Int("lids_count", len(lidsBatch)), zap.Error(err))
}
return mids
}Why not just implement this logic directly inside sealedIDsIndex.GetMIDs?
As it stands, it's unclear why we created a new package dependency — seqids on node (because of node.LID).
Description
Continuation of #390
iterateEvalTreeworks with batches of lids, requests batches of mids and ridsget_midstepI did some measurements for both patches (this combined with #390) vs main (used bitpack encoding in both branches). For small ordinary searches there is no benefit. For dense analytic queries there is a decent improvement.
For our k6 benchmark
seq-db-hist.js:2.3 sec=>650 msFor
seq-db-aggs.js:6.1 sec=>4.7 secHist over
_all_(warm query) (3 prod fractions):~37 ms=>~15 msPart of #329